/*
 * Copyright 2015 The Netty Project
 *
 * The Netty Project licenses this file to you under the Apache License,
 * version 2.0 (the "License"); you may not use this file except in compliance
 * with the License. You may obtain a copy of the License at:
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations
 * under the License.
 */
package io.netty.channel;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.util.UncheckedBooleanSupplier;

/**
 * Default implementation of {@link MaxMessagesRecvByteBufAllocator} which
 * respects {@link ChannelConfig#isAutoRead()} and also prevents overflow.
 */
public abstract class DefaultMaxMessagesRecvByteBufAllocator
        implements MaxMessagesRecvByteBufAllocator
{
    private volatile int maxMessagesPerRead;

    public DefaultMaxMessagesRecvByteBufAllocator()
    {
        this(1);
    }

    public DefaultMaxMessagesRecvByteBufAllocator(int maxMessagesPerRead)
    {
        maxMessagesPerRead(maxMessagesPerRead);
    }

    @Override
    public int maxMessagesPerRead()
    {
        return maxMessagesPerRead;
    }

    @Override
    public MaxMessagesRecvByteBufAllocator maxMessagesPerRead(
            int maxMessagesPerRead)
    {
        if (maxMessagesPerRead <= 0)
        {
            throw new IllegalArgumentException("maxMessagesPerRead: "
                    + maxMessagesPerRead + " (expected: > 0)");
        }
        this.maxMessagesPerRead = maxMessagesPerRead;
        return this;
    }

    /**
     * Focuses on enforcing the maximum messages per read condition for
     * {@link #continueReading()}.
     */
    public abstract class MaxMessageHandle implements ExtendedHandle
    {
        private ChannelConfig config;

        private int maxMessagePerRead;

        private int totalMessages;

        private int totalBytesRead;

        private int attemptedBytesRead;

        private int lastBytesRead;

        private final UncheckedBooleanSupplier defaultMaybeMoreSupplier = new UncheckedBooleanSupplier()
        {
            @Override
            public boolean get()
            {
                return attemptedBytesRead == lastBytesRead;
            }
        };

        /**
         * Only {@link ChannelConfig#getMaxMessagesPerRead()} is used.
         */
        @Override
        public void reset(ChannelConfig config)
        {
            this.config = config;
            maxMessagePerRead = maxMessagesPerRead();
            totalMessages = totalBytesRead = 0;
        }

        @Override
        public ByteBuf allocate(ByteBufAllocator alloc)
        {
            return alloc.ioBuffer(guess());
        }

        @Override
        public final void incMessagesRead(int amt)
        {
            totalMessages += amt;
        }

        @Override
        public final void lastBytesRead(int bytes)
        {
            lastBytesRead = bytes;
            if (bytes > 0)
            {
                totalBytesRead += bytes;
            }
        }

        @Override
        public final int lastBytesRead()
        {
            return lastBytesRead;
        }

        @Override
        public boolean continueReading()
        {
            return continueReading(defaultMaybeMoreSupplier);
        }

        @Override
        public boolean continueReading(
                UncheckedBooleanSupplier maybeMoreDataSupplier)
        {
            return config.isAutoRead() && maybeMoreDataSupplier.get()
                    && totalMessages < maxMessagePerRead && totalBytesRead > 0;
        }

        @Override
        public void readComplete()
        {
        }

        @Override
        public int attemptedBytesRead()
        {
            return attemptedBytesRead;
        }

        @Override
        public void attemptedBytesRead(int bytes)
        {
            attemptedBytesRead = bytes;
        }

        protected final int totalBytesRead()
        {
            return totalBytesRead < 0 ? Integer.MAX_VALUE : totalBytesRead;
        }
    }
}
